package com.xdl.apitest.tabletest

import com.xdl.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._

/**
 * Project: FlinkTutorial
 * Package: com.xdl.apitest
 * Version: 1.0
 *
 * Created by guoxiaolong on 2020-08-01-15:01
 */
object Example {
  def main(args: Array[String]): Unit = {

    //0.创建流执行环境，读取数据并转换成样例类
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    //从文件读取数据
    val inputStream: DataStream[String] = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")

    //map 成样例类型
    val dataStream: DataStream[SensorReading] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
      })

    //1. 基于 env 创建表环境
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2. 基于 tableEnv, 将流转换成表
    val dataTable: Table = tableEnv.fromDataStream(dataStream)

    //3. 转换操作，得到提取结果
    //3.1 调用 table api, 做转换操作
    val  resultTable: Table = dataTable
        .select("id, temperature")
        .filter("id == sensor_1")

    //3.2 写 sql 实现转换
   //tableEnv.registerTable("dataTable", dataTable)
    tableEnv.createTemporaryView("dataTable", dataTable)
    //tableEnv.sqlQuery("select id, temperature from " + dataStream + "where id = 'senesor_1")
    val resultSqlTable: Table = tableEnv.sqlQuery(
      """
        |select id, temperature
        |from dataTable
        |where id = 'sensor_1'
        |""".stripMargin)

    //4. 把表转换成流，打印输出
    val resultStream: DataStream[(String, Double)] = resultSqlTable
        .toAppendStream[(String, Double)]
    resultStream.print()

    env.execute("table api example job")
  }
}
